"""
Case Type   : 基础功能
Case Name   : 发布表存在唯一约束
Create At   : 2022/1/26
Owner       : opentestcase026
Description :
    1.在两个集群创建表
    2.创建发布端
    3.创建订阅
    4.修改表数据
    5.查询是否同步
    6.集群B插入数据
    7.集群A插入数据
    8.集群A再次插入数据
    9.查询是否同步
    10.删除集群B冲突数据，再查询
Expect      :
    1.成功
    2.成功
    3.成功
    4.成功
    5.同步
    6.成功
    7.成功，但集群B提示冲突
    8.成功
    9.未更新
    10.之前操作被同步
History     :
    modified：2022-4-1 by 5328126;修改pg_log目录，避免其他日志影响
    Modified by opentestcase026 2022/5/5:增加等待5.5s，适配最新代码
    Modified by opentestcase043 2022/8/4:增加基础数据同步功能后订阅端有基础数据的同
    时再在订阅端修改表数据,主键冲突,关闭基础数据同步
    Modified by opentestcase012 2023/4/20:修改新的pg_log目录
"""
import unittest
import os
from yat.test import macro
from yat.test import Node
from testcase.utils.Logger import Logger
from testcase.utils.CommonSH import CommonSH
from testcase.utils.Common import Common
from testcase.utils.Constant import Constant

Primary_SH = CommonSH('PrimaryDbUser')


@unittest.skipIf(3 != Primary_SH.get_node_num(), '非1+2环境不执行')
class Pubsubclass(unittest.TestCase):
    def setUp(self):
        self.log = Logger()
        self.log.info("-----------this is setup-----------")
        self.log.info(f"-----{os.path.basename(__file__)[:-3]} start-----")
        self.pri_userdb_pub = Node(node='PrimaryDbUser')
        self.pri_userdb_sub = Node(node='remote1_PrimaryDbUser')
        self.constant = Constant()
        self.commsh_pub = CommonSH('PrimaryDbUser')
        self.commsh_sub = CommonSH('remote1_PrimaryDbUser')
        self.com_pub = Common()
        self.com_sub = Common('remote1_PrimaryDbUser')
        self.tb_name1 = 'tb_pubsub_case024_1'
        self.subname = "sub_case024"
        self.pubname = "pub_case024"
        self.parent_path_pub = os.path.dirname(macro.DB_INSTANCE_PATH)
        self.parent_path_sub = os.path.dirname(macro.DB_INSTANCE_PATH_REMOTE1)
        self.port = str(int(self.pri_userdb_pub.db_port) + 1)
        self.wal_level = self.com_pub.show_param("wal_level")
        self.user_param_pub = f'-U {self.pri_userdb_pub.db_user} ' \
                              f'-W {self.pri_userdb_pub.db_password}'
        self.user_param_sub = f'-U {self.pri_userdb_sub.db_user} ' \
                              f'-W {self.pri_userdb_sub.db_password}'
        cmd = f"cp " \
              f"{os.path.join(macro.DB_INSTANCE_PATH, 'pg_hba.conf')} " \
              f"{os.path.join(self.parent_path_pub, 'pg_hba.conf')};"
        self.log.info(cmd)
        result = self.pri_userdb_pub.sh(cmd).result()
        self.log.info(result)
        cmd = f"cp " \
              f"{os.path.join(macro.DB_INSTANCE_PATH_REMOTE1, 'pg_hba.conf')}" \
              f" {os.path.join(self.parent_path_sub, 'pg_hba.conf')};"
        self.log.info(cmd)
        result = self.pri_userdb_sub.sh(cmd).result()
        self.log.info(result)
        self.case_no = os.path.basename(__file__)[-6:-3]
        self.dir_new = os.path.join('$GAUSSLOG', 'pg_log', 'pg_bak',
                                    f'pub_sub_case{self.case_no}')
        self.log_directory = self.com_sub.show_param(
                            "log_directory", macro.DB_ENV_PATH_REMOTE1)
        self.hostname = self.pri_userdb_sub.sh('hostname').result().strip()
        text = '--step:预置条件,修改pg_hba expect:成功'
        self.log.info(text)
        guc_res = self.commsh_pub.execute_gsguc(
                  'reload', self.constant.GSGUC_SUCCESS_MSG, '',
                  'all', False, False, '',
                  f'host    replication  {self.pri_userdb_sub.db_user} '
                  f'{self.pri_userdb_sub.db_host}/32 sha256')
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败:' + text)
        result = self.commsh_pub.execute_gsguc(
                'set', self.constant.GSGUC_SUCCESS_MSG, 'wal_level=logical')
        self.assertTrue(result, '执行失败:' + text)
        result = self.commsh_pub.restart_db_cluster(True)
        flg = self.constant.START_SUCCESS_MSG in result or 'Degrade' in result
        self.assertTrue(flg, '执行失败:' + text)
        guc_res = self.commsh_sub.execute_gsguc(
                  'reload', self.constant.GSGUC_SUCCESS_MSG, '',
                  'all', False, False, macro.DB_INSTANCE_PATH_REMOTE1,
                  f'host    replication  {self.pri_userdb_pub.db_user} '
                  f'{self.pri_userdb_pub.db_host}/32 sha256',
                  macro.DB_ENV_PATH_REMOTE1)
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败:' + text)
        result = self.commsh_sub.execute_gsguc(
                 'set', self.constant.GSGUC_SUCCESS_MSG,
                 f"log_directory='{self.dir_new}'",
                 self.hostname, False, False,
                 macro.DB_INSTANCE_PATH_REMOTE1, '',
                 macro.DB_ENV_PATH_REMOTE1)
        self.assertTrue(result, '执行失败:' + text)
        result = self.commsh_sub.restart_db_cluster(True,
                                                    macro.DB_ENV_PATH_REMOTE1)
        flg = self.constant.START_SUCCESS_MSG in result or 'Degrade' in result
        self.assertTrue(flg, '执行失败:' + text)

    def test_pubsub(self):
        text = '--step1:两个集群均创建表 expect:成功--'
        self.log.info(text)
        sql = f"create table {self.tb_name1}(i int primary key,  " \
            f"t text UNIQUE);"
        result = self.commsh_pub.execut_db_sql(
            sql, sql_type=self.user_param_pub)
        self.log.info(result)
        self.assertEqual(result.count(self.constant.TABLE_CREATE_SUCCESS),
                         3, '执行失败:' + text)
        result = self.commsh_sub.execut_db_sql(sql, self.user_param_sub, None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        self.assertEqual(result.count(self.constant.TABLE_CREATE_SUCCESS),
                         3, '执行失败:' + text)

        text = '--step2:创建发布端 expect:成功--'
        self.log.info(text)
        sql = f"CREATE PUBLICATION {self.pubname} " \
            f"FOR  TABLE {self.tb_name1}*;"
        result = self.commsh_pub.execut_db_sql(sql,
                                               sql_type=self.user_param_pub)
        self.log.info(result)
        self.assertIn(self.constant.create_pub_succ_msg, result,
                      '执行失败:' + text)
        self.assertNotIn(self.constant.SQL_WRONG_MSG[1], result,
                         '执行失败:' + text)

        text = '--step3:创建订阅 expect:成功--'
        self.log.info(text)
        result = self.commsh_sub.execute_generate(
            macro.COMMON_PASSWD, env_path=macro.DB_ENV_PATH_REMOTE1)
        self.assertIn('', result, '执行失败:' + text)
        sql = f"CREATE SUBSCRIPTION {self.subname} CONNECTION " \
            f"'host={self.pri_userdb_pub.db_host} " \
            f"port={self.port} " \
            f"user={self.pri_userdb_pub.db_user} " \
            f"dbname={self.pri_userdb_pub.db_name} " \
            f"password={self.pri_userdb_pub.ssh_password}' " \
            f"PUBLICATION {self.pubname} with (copy_data=false);"
        result = self.commsh_sub.execut_db_sql(sql, self.user_param_sub, None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        self.assertIn(self.constant.create_sub_succ_msg,
                      result, '执行失败:' + text)

        text = '--step4:修改表数据expect:成功--'
        self.log.info(text)
        sql = f"insert into {self.tb_name1} values(1, 'first');" \
            f"select pg_sleep(5.5);"
        result = self.commsh_pub.execut_db_sql(sql,
                                               sql_type=self.user_param_pub)
        self.log.info(result)
        self.assertIn(self.constant.INSERT_SUCCESS_MSG, result, '执行失败'+text)

        text = "--step5:查询是否同步 expect:同步--"
        self.log.info(text)
        sql_select = f"select * from {self.tb_name1};"
        result = self.commsh_sub.execut_db_sql(sql_select,
                                               self.user_param_sub,
                                               None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        self.assertIn('1 | first', result, '执行失败' + text)
        self.assertIn('1 row', result, '执行失败' + text)

        text = '--step6:集群B插入数据 expect:成功--'
        self.log.info(text)
        sql = f"insert into {self.tb_name1} values(3, 'conflict');"
        result = self.commsh_sub.execut_db_sql(sql,
                                               self.user_param_sub,
                                               None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        self.assertIn(self.constant.INSERT_SUCCESS_MSG, result, '执行失败' + text)

        text = "--step7:集群A插入数据 expect:成功，但集群B提示冲突--"
        self.log.info(text)
        sql = f"insert into {self.tb_name1} values(2, 'conflict');"
        result = self.commsh_pub.execut_db_sql(sql,
                                               sql_type=self.user_param_pub)
        self.log.info(result)
        self.assertIn(self.constant.INSERT_SUCCESS_MSG, result, '执行失败' + text)

        result = self.commsh_sub.execut_db_sql(sql_select,
                                               self.user_param_sub,
                                               None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        self.assertIn('2 rows', result, '执行失败' + text)
        self.assertIn('3 | conflict', result, '执行失败' + text)
        self.assertIn('1 | first', result, '执行失败' + text)
        cmd = f'source {macro.DB_ENV_PATH_REMOTE1};' \
            f'find {self.dir_new} -iname "post*" -mmin -2 | xargs ls -lta '
        self.log.info(cmd)
        result = self.pri_userdb_sub.sh(cmd).result()
        self.current_log = result.splitlines()[0].split(" ")[-1]
        flg = 'FATAL:  duplicate key value violates unique constraint'
        cmd = f'cat {self.current_log} | grep "{flg}"'
        self.log.info(cmd)
        result = self.pri_userdb_sub.sh(cmd).result()
        self.log.info(result)
        self.assertIn(flg, result, '执行失败' + text)

        text = '--step8:集群A再次插入数据 expect:成功--'
        self.log.info(text)
        sql = f"insert into {self.tb_name1} values(5, 'ok');"
        result = self.commsh_pub.execut_db_sql(sql,
                                               sql_type=self.user_param_pub)
        self.log.info(result)
        self.assertIn(self.constant.INSERT_SUCCESS_MSG, result, '执行失败' + text)

        text = "--step9:查询是否同步 expect:未更新--"
        self.log.info(text)
        result = self.commsh_sub.execut_db_sql(sql_select,
                                               self.user_param_sub,
                                               None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        self.assertIn('2 rows', result, '执行失败' + text)
        self.assertIn('3 | conflict', result, '执行失败' + text)
        self.assertIn('1 | first', result, '执行失败' + text)

        text = "--step10:删除集群B冲突数据，再查询 expect:之前操作被同步--"
        self.log.info(text)
        sql = f"delete from {self.tb_name1} where i=3;" \
            f"select pg_sleep(5.5);"
        result = self.commsh_sub.execut_db_sql(sql,
                                               self.user_param_sub,
                                               None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        self.assertIn(self.constant.DELETE_SUCCESS_MSG, result, '执行失败' + text)

        result = self.commsh_sub.execut_db_sql(sql_select,
                                               self.user_param_sub,
                                               None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        self.assertIn('1 | first', result, '执行失败' + text)
        self.assertIn('2 | conflict', result, '执行失败' + text)
        self.assertIn('5 | ok', result, '执行失败' + text)
        self.assertIn('3 rows', result, '执行失败' + text)

    def tearDown(self):
        self.log.info('------------this is tearDown-------------')
        text = '--清理环境--'
        self.log.info(text)
        sql = f"DROP SUBSCRIPTION  {self.subname};"
        drop_sub_result = self.commsh_sub.execut_db_sql(
            sql, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(drop_sub_result)
        sql = f"DROP PUBLICATION if exists {self.pubname};"
        drop_pub_result = self.commsh_pub.execut_db_sql(
            sql, sql_type=self.user_param_pub)
        self.log.info(drop_pub_result)
        sql = f"DROP table {self.tb_name1};"
        result = self.commsh_sub.execut_db_sql(sql, self.user_param_sub, None,
                                               macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result)
        result = self.commsh_pub.execut_db_sql(sql,
                                               sql_type=self.user_param_pub)
        self.log.info(result)

        cmd = f"mv " \
            f"{os.path.join(self.parent_path_pub, 'pg_hba.conf')} "\
            f"{os.path.join(macro.DB_INSTANCE_PATH, 'pg_hba.conf')} "
        self.log.info(cmd)
        result = self.pri_userdb_pub.sh(cmd).result()
        self.log.info(result)
        cmd = f"mv " \
            f"{os.path.join(self.parent_path_sub, 'pg_hba.conf')} "\
            f"{os.path.join(macro.DB_INSTANCE_PATH_REMOTE1, 'pg_hba.conf')} "
        self.log.info(cmd)
        result = self.pri_userdb_sub.sh(cmd).result()
        self.log.info(result)
        result = self.commsh_pub.execute_gsguc(
            'set', self.constant.GSGUC_SUCCESS_MSG,
            f'wal_level={self.wal_level}')
        result1 = self.commsh_sub.execute_gsguc(
            'set', self.constant.GSGUC_SUCCESS_MSG,
            f"log_directory='{self.log_directory}'",
            self.hostname, False, False,
            macro.DB_INSTANCE_PATH_REMOTE1, '',
            macro.DB_ENV_PATH_REMOTE1)
        self.assertTrue(result and result1, '执行失败:' + text)
        self.commsh_pub.restart_db_cluster(True)
        self.commsh_sub.restart_db_cluster(True, macro.DB_ENV_PATH_REMOTE1)
        self.assertIn(self.constant.drop_pub_succ_msg, drop_pub_result,
                      '执行失败' + text)
        self.assertIn(self.constant.drop_sub_succ_msg, drop_sub_result,
                      '执行失败' + text)
        self.log.info(f"-----{os.path.basename(__file__)[:-3]} end-----")
